kafka auto.offset.reset latest earliest 详解

您所在的位置:网站首页 kafka has no active members kafka auto.offset.reset latest earliest 详解

kafka auto.offset.reset latest earliest 详解

#kafka auto.offset.reset latest earliest 详解| 来源: 网络整理| 查看: 265

auto.offset.reset关乎kafka数据的读取,是一个非常重要的设置。常用的二个值是latest和earliest,默认是latest。

 

一,latest和earliest区别

1,earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费

2,latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据

提交过offset,latest和earliest没有区别,但是在没有提交offset情况下,用latest直接会导致无法读取旧数据。

二,创建topic

查看复制打印? # bin/kafka-topics.sh --create --zookeeper bigserver1:2181,bigserver2:2181,testing:2181 --replication-factor 2 --partitions 3 --topic tank   Created topic "tank".      # bin/kafka-topics.sh --describe --zookeeper bigserver1:2181,bigserver2:2181,testing:2181 --topic tank   Topic:tank PartitionCount:3 ReplicationFactor:2 Configs:    Topic: tank Partition: 0 Leader: 0 Replicas: 0,2 Isr: 0,2    Topic: tank Partition: 1 Leader: 1 Replicas: 1,0 Isr: 1,0    Topic: tank Partition: 2 Leader: 2 Replicas: 2,1 Isr: 2,1  

三,生产数据和接收生产数据

[root@bigserver1 kafka]# bin/kafka-console-producer.sh --broker-list bigserver1:9092,bigserver2:9092,testing:9092 --topic tank   >1   >2   >3   >4   >5   >6   。。。。。。。。。省略。。。。。。。。。   [root@bigserver1 kafka]# bin/kafka-console-consumer.sh --bootstrap-server bigserver1:9092,bigserver2:9092,testing:9092 --topic tank --from-beginning   1   2   3   4   5   6   。。。。。。。。省略。。。。。。。。  

四,测试代码

查看复制打印? object tank {       def main(args: Array[String]): Unit = {           val pros: Properties = new Properties           pros.put("bootstrap.servers", "bigserver1:9092,bigserver2:9092,testing:9092")           /*分组由消费者决定,完全自定义,没有要求*/           pros.put("group.id", "tank")           //设置为true 表示offset自动托管到kafka内部的一个特定名称为__consumer_offsets的topic           pros.put("enable.auto.commit", "false")           pros.put("auto.commit.interval.ms", "1000")           pros.put("max.poll.records", "5")           pros.put("session.timeout.ms", "30000")           //只有当offset不存在的时候,才用latest或者earliest           pros.put("auto.offset.reset", "latest")              pros.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")           pros.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")              val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](pros)              /*这里填写主题名称*/           consumer.subscribe(util.Arrays.asList("tank"))              val system = akka.actor.ActorSystem("system")           system.scheduler.schedule(0 seconds, 30 seconds)(tankTest.saveData(args,consumer))          }          object tankTest {           def saveData(args: Array[String],consumer: KafkaConsumer[String,String]): Unit = {               val records: ConsumerRecords[String, String] = consumer.poll(Duration.ofSeconds(3))               if (!records.isEmpty) {                   for (record 


【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3